今天我們要專注於監控和 log 分析相關,前陣子我們幾乎都只是針對 aws bedrock 和
Sagemaker 提到關於監控相關的執行和實作,今天我們以一個篇幅針對監控和相關操作
撰寫今天的內容,並且提及一些可觀測性架構,在實務需求中我們要考量模型的健康狀態
以及性能表現。
監控和日誌分析扮演著關鍵角色
首先我們先開始設定基本監控,使用 cloudwatch 去實現這個目的
讓我們為 SageMaker 端點設定基本的 CloudWatch 監控
import boto3
from datetime import datetime, timedelta
# 初始化客戶端
cloudwatch = boto3.client('cloudwatch')
sagemaker = boto3.client('sagemaker')
def get_endpoint_metrics(endpoint_name, metric_name, period=300):
"""
獲取端點指標
Args:
endpoint_name: 端點名稱
metric_name: 指標名稱
period: 時間週期(秒)
"""
end_time = datetime.utcnow()
start_time = end_time - timedelta(hours=1)
response = cloudwatch.get_metric_statistics(
Namespace='AWS/SageMaker',
MetricName=metric_name,
Dimensions=[
{
'Name': 'EndpointName',
'Value': endpoint_name
},
],
StartTime=start_time,
EndTime=end_time,
Period=period,
Statistics=['Average', 'Maximum', 'Minimum']
)
return response['Datapoints']
# 關鍵指標
endpoint_name = 'your-endpoint-name'
# 獲取各項指標
invocations = get_endpoint_metrics(endpoint_name, 'Invocations')
model_latency = get_endpoint_metrics(endpoint_name, 'ModelLatency')
overhead_latency = get_endpoint_metrics(endpoint_name, 'OverheadLatency')
print(f"呼叫次數: {len(invocations)} 個數據點")
print(f"模型延遲: {len(model_latency)} 個數據點")
def create_monitoring_dashboard(endpoint_name, dashboard_name):
"""創建 CloudWatch 儀表板"""
dashboard_body = {
"widgets": [
{
"type": "metric",
"properties": {
"metrics": [
["AWS/SageMaker", "ModelLatency", {"stat": "Average"}],
[".", ".", {"stat": "p99"}]
],
"period": 300,
"stat": "Average",
"region": "<你的Region>",
"title": "模型延遲",
"yAxis": {
"left": {
"label": "毫秒"
}
}
}
},
{
"type": "metric",
"properties": {
"metrics": [
["AWS/SageMaker", "Invocations", {"stat": "Sum"}],
[".", "Invocation4XXErrors", {"stat": "Sum"}],
[".", "Invocation5XXErrors", {"stat": "Sum"}]
],
"period": 300,
"stat": "Sum",
"region": "<你的Region>",
"title": "請求與錯誤",
"yAxis": {
"left": {
"label": "次數"
}
}
}
},
{
"type": "metric",
"properties": {
"metrics": [
["AWS/SageMaker", "CPUUtilization", {"stat": "Average"}],
[".", "MemoryUtilization", {"stat": "Average"}]
],
"period": 300,
"stat": "Average",
"region": "<你的Region>",
"title": "資源使用率",
"yAxis": {
"left": {
"label": "百分比"
}
}
}
}
]
}
cloudwatch.put_dashboard(
DashboardName=dashboard_name,
DashboardBody=json.dumps(dashboard_body)
)
print(f"儀表板 '{dashboard_name}' 創建成功!")
# 創建儀表板
create_monitoring_dashboard(endpoint_name, 'SageMaker-Endpoint-Monitor')
當然這個可以在 AWS cloudwatch 頁面去實現手工創建,這裡就不演示
def create_latency_alarm(endpoint_name, threshold_ms=1000):
"""創建延遲告警"""
alarm_name = f'{endpoint_name}-high-latency'
cloudwatch.put_metric_alarm(
AlarmName=alarm_name,
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=2,
MetricName='ModelLatency',
Namespace='AWS/SageMaker',
Period=300,
Statistic='Average',
Threshold=threshold_ms,
ActionsEnabled=True,
AlarmDescription=f'當模型延遲超過 {threshold_ms}ms 時觸發',
Dimensions=[
{
'Name': 'EndpointName',
'Value': endpoint_name
},
],
TreatMissingData='notBreaching'
)
print(f"延遲告警 '{alarm_name}' 創建成功!")
def create_error_alarm(endpoint_name, error_threshold=10):
"""創建錯誤率告警"""
alarm_name = f'{endpoint_name}-high-errors'
cloudwatch.put_metric_alarm(
AlarmName=alarm_name,
ComparisonOperator='GreaterThanThreshold',
EvaluationPeriods=1,
MetricName='Invocation5XXErrors',
Namespace='AWS/SageMaker',
Period=300,
Statistic='Sum',
Threshold=error_threshold,
ActionsEnabled=True,
AlarmDescription=f'當 5XX 錯誤超過 {error_threshold} 次時觸發',
Dimensions=[
{
'Name': 'EndpointName',
'Value': endpoint_name
},
]
)
print(f"錯誤告警 '{alarm_name}' 創建成功!")
# 創建告警
create_latency_alarm(endpoint_name, threshold_ms=500)
create_error_alarm(endpoint_name, error_threshold=5)
這裏針對關鍵指標進行警報
aws 有個服務為 SNS 可以針對警報發出通知,像是寄信之類的操作
可以直接針對 cloudwatch 進行整合
def setup_alarm_notification(alarm_name, sns_topic_arn):
"""為告警設定 SNS 通知"""
cloudwatch.put_metric_alarm(
AlarmName=alarm_name,
AlarmActions=[sns_topic_arn],
# ... 其他告警設定
)
# 創建 SNS 主題
sns = boto3.client('sns')
topic_response = sns.create_topic(Name='sagemaker-alerts')
topic_arn = topic_response['TopicArn']
# 訂閱 Email
sns.subscribe(
TopicArn=topic_arn,
Protocol='email',
Endpoint='your-email@example.com'
)
print(f"SNS 主題創建成功: {topic_arn}")
def enable_bedrock_logging(model_id, log_group_name):
"""啟用 Bedrock 模型呼叫日誌"""
bedrock = boto3.client('bedrock')
logs = boto3.client('logs')
# 創建 CloudWatch Logs 群組
try:
logs.create_log_group(logGroupName=log_group_name)
print(f"日誌群組 '{log_group_name}' 創建成功")
except logs.exceptions.ResourceAlreadyExistsException:
print(f"日誌群組 '{log_group_name}' 已存在")
# 設定日誌保留期限
logs.put_retention_policy(
logGroupName=log_group_name,
retentionInDays=7
)
return log_group_name
log_group = enable_bedrock_logging(
model_id='anthropic.claude-v2',
log_group_name='/aws/bedrock/claude-invocations'
)
cloud watch log insight 分析
def analyze_bedrock_logs(log_group_name, hours=24):
"""分析 Bedrock 呼叫日誌"""
logs = boto3.client('logs')
# 查詢語句
query = """
fields @timestamp, @message
| filter @message like /error/
| stats count() as error_count by bin(5m)
| sort @timestamp desc
"""
start_time = int((datetime.now() - timedelta(hours=hours)).timestamp())
end_time = int(datetime.now().timestamp())
# 執行查詢
response = logs.start_query(
logGroupName=log_group_name,
startTime=start_time,
endTime=end_time,
queryString=query
)
query_id = response['queryId']
# 等待查詢完成
import time
while True:
result = logs.get_query_results(queryId=query_id)
status = result['status']
if status == 'Complete':
return result['results']
elif status == 'Failed':
raise Exception("查詢失敗")
time.sleep(1)
# 執行分析
results = analyze_bedrock_logs('/aws/bedrock/claude-invocations', hours=1)
for result in results[:10]: # 顯示前 10 筆
print(result)
這裏有常用查詢
# 查詢 1: Token 使用統計
token_usage_query = """
fields @timestamp, inputTokens, outputTokens
| stats sum(inputTokens) as total_input,
sum(outputTokens) as total_output,
avg(inputTokens) as avg_input,
avg(outputTokens) as avg_output
"""
# 查詢 2: 延遲分析
latency_query = """
fields @timestamp, latency
| stats avg(latency) as avg_latency,
max(latency) as max_latency,
pct(latency, 95) as p95_latency,
pct(latency, 99) as p99_latency
by bin(5m)
"""
# 查詢 3: 錯誤率趨勢
error_rate_query = """
fields @timestamp, statusCode
| stats count() as total,
sum(statusCode >= 400) as errors
by bin(15m)
| fields bin, errors / total * 100 as error_rate
"""
def log_custom_metric(metric_name, value, unit='None', dimensions=None):
"""推送自定義指標到 CloudWatch"""
metric_data = {
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Timestamp': datetime.utcnow()
}
if dimensions:
metric_data['Dimensions'] = dimensions
cloudwatch.put_metric_data(
Namespace='CustomAI/Metrics',
MetricData=[metric_data]
)
# 範例: 記錄模型預測信心度
def track_prediction_confidence(endpoint_name, confidence_score):
"""追蹤預測信心度"""
log_custom_metric(
metric_name='PredictionConfidence',
value=confidence_score,
unit='None',
dimensions=[
{'Name': 'EndpointName', 'Value': endpoint_name}
]
)
# 使用範例
track_prediction_confidence('my-endpoint', 0.95)
import json
import logging
# 設定結構化日誌
class StructuredLogger:
def __init__(self, log_group_name):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
# 設定 CloudWatch handler (實際需要 watchtower 套件)
# handler = watchtower.CloudWatchLogHandler(log_group=log_group_name)
# self.logger.addHandler(handler)
def log_inference(self, endpoint_name, input_data, output, latency):
"""記錄推論請求"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'inference',
'endpoint_name': endpoint_name,
'input_length': len(str(input_data)),
'output_length': len(str(output)),
'latency_ms': latency,
'status': 'success'
}
self.logger.info(json.dumps(log_entry))
def log_error(self, endpoint_name, error_message, error_type):
"""記錄錯誤"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': 'error',
'endpoint_name': endpoint_name,
'error_type': error_type,
'error_message': error_message,
'status': 'failed'
}
self.logger.error(json.dumps(log_entry))
# 使用範例
logger = StructuredLogger('/aws/sagemaker/inference-logs')
logger.log_inference('my-endpoint', {'text': 'sample'}, {'result': 0.9}, 150)
aws 有 cost explorer
可以參考 這裏進行整合
def get_service_costs(service_name, days=7):
"""獲取服務成本"""
ce = boto3.client('ce') # Cost Explorer
end_date = datetime.now().date()
start_date = end_date - timedelta(days=days)
response = ce.get_cost_and_usage(
TimePeriod={
'Start': start_date.strftime('%Y-%m-%d'),
'End': end_date.strftime('%Y-%m-%d')
},
Granularity='DAILY',
Metrics=['UnblendedCost'],
Filter={
'Dimensions': {
'Key': 'SERVICE',
'Values': [service_name]
}
}
)
return response['ResultsByTime']
# 獲取 SageMaker 成本
sagemaker_costs = get_service_costs('Amazon SageMaker')
print("SageMaker 每日成本:")
for item in sagemaker_costs:
date = item['TimePeriod']['Start']
cost = item['Total']['UnblendedCost']['Amount']
print(f"{date}: ${float(cost):.2f}")
# 獲取 Bedrock 成本
bedrock_costs = get_service_costs('Amazon Bedrock')
一樣我們這裡做 alarm 做成本告警,告訴團隊『你花太多錢了😎』
def create_cost_alarm(budget_amount, service_name):
"""創建成本預算告警"""
budgets = boto3.client('budgets')
account_id = boto3.client('sts').get_caller_identity()['Account']
budget = {
'BudgetName': f'{service_name}-monthly-budget',
'BudgetLimit': {
'Amount': str(budget_amount),
'Unit': 'USD'
},
'TimeUnit': 'MONTHLY',
'BudgetType': 'COST',
'CostFilters': {
'Service': [service_name]
}
}
notification = {
'NotificationType': 'ACTUAL',
'ComparisonOperator': 'GREATER_THAN',
'Threshold': 80, # 80% 的預算
'ThresholdType': 'PERCENTAGE'
}
subscriber = {
'SubscriptionType': 'EMAIL',
'Address': 'your-email@example.com'
}
try:
budgets.create_budget(
AccountId=account_id,
Budget=budget,
NotificationsWithSubscribers=[
{
'Notification': notification,
'Subscribers': [subscriber]
}
]
)
print(f"成本預算告警設定成功!")
except Exception as e:
print(f"設定失敗: {e}")
# 為 SageMaker 設定月度預算 $500
create_cost_alarm(500, 'Amazon SageMaker')
def create_comprehensive_dashboard():
"""創建綜合監控儀表板"""
dashboard_name = 'AI-Services-Comprehensive-Monitor'
dashboard_body = {
"widgets": [
# SageMaker 性能
{
"type": "metric",
"properties": {
"title": "SageMaker 端點延遲",
"metrics": [
["AWS/SageMaker", "ModelLatency", {"stat": "Average"}],
["...", {"stat": "p99"}]
],
"period": 300,
"region": "<你的Region>"
}
},
# Bedrock Token 使用
{
"type": "log",
"properties": {
"title": "Bedrock Token 使用趨勢",
"query": """
SOURCE '/aws/bedrock/model-invocations'
| fields @timestamp, inputTokens, outputTokens
| stats sum(inputTokens + outputTokens) as total_tokens by bin(1h)
""",
"region": "<你的Region>"
}
},
# 錯誤率
{
"type": "metric",
"properties": {
"title": "服務錯誤率",
"metrics": [
["AWS/SageMaker", "Invocation4XXErrors"],
[".", "Invocation5XXErrors"]
],
"period": 300,
"stat": "Sum",
"region": "<你的Region>"
}
},
# 成本追蹤
{
"type": "metric",
"properties": {
"title": "每日成本",
"metrics": [
["AWS/Billing", "EstimatedCharges",
{"ServiceName": "Amazon SageMaker"}],
["...", {"ServiceName": "Amazon Bedrock"}]
],
"period": 86400,
"stat": "Maximum",
"region": "<你的Region>"
}
}
]
}
cloudwatch.put_dashboard(
DashboardName=dashboard_name,
DashboardBody=json.dumps(dashboard_body)
)
print(f"綜合儀表板創建成功!")
print(f"訪問網址: https://console.aws.amazon.com/cloudwatch/home?region=<你的Region>#dashboards:name={dashboard_name}")
create_comprehensive_dashboard()
def endpoint_health_check(endpoint_name):
"""端點健康檢查"""
checks = {
'endpoint_status': False,
'low_latency': False,
'low_errors': False,
'sufficient_capacity': False
}
# 檢查端點狀態
try:
response = sagemaker.describe_endpoint(EndpointName=endpoint_name)
checks['endpoint_status'] = response['EndpointStatus'] == 'InService'
except Exception as e:
print(f"無法獲取端點狀態: {e}")
return checks
# 檢查延遲
latency_data = get_endpoint_metrics(endpoint_name, 'ModelLatency', period=300)
if latency_data:
avg_latency = sum(d['Average'] for d in latency_data) / len(latency_data)
checks['low_latency'] = avg_latency < 500 # 小於 500ms
# 檢查錯誤率
error_data = get_endpoint_metrics(endpoint_name, 'Invocation5XXErrors', period=300)
if error_data:
total_errors = sum(d['Sum'] for d in error_data)
checks['low_errors'] = total_errors < 10
# 檢查容量
invocations = get_endpoint_metrics(endpoint_name, 'Invocations', period=300)
if invocations:
checks['sufficient_capacity'] = True # 簡化檢查
# 輸出結果
print(f"\n端點健康檢查: {endpoint_name}")
print("=" * 50)
for check, status in checks.items():
status_icon = "✓" if status else "✗"
print(f"{status_icon} {check}: {'通過' if status else '失敗'}")
return all(checks.values())
# 執行健康檢查
is_healthy = endpoint_health_check('your-endpoint-name')